package com.nx.platform.es.biz.esspider.source;

import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.MoreExecutors;
import com.nx.platform.es.biz.esspider.recycle.Recycler;
import com.nx.platform.es.biz.esspider.deliver.Deliverer;
import com.nx.platform.es.biz.esspider.entity.Message;
import com.nx.platform.es.common.utils.MoreThreadFactorys;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


/**
 * @author
 * @date 2018/04/13
 */
@Slf4j
public class RecycleSourceService extends AbstractIdleService implements SourceService {


    private final Deliverer deliverer;
    private final Recycler recycler;

    private final ScheduledExecutorService executor;

    public RecycleSourceService(Deliverer deliverer, Recycler recycler) {
        this.deliverer = deliverer;
        this.recycler = recycler;
        this.executor = Executors.newSingleThreadScheduledExecutor(MoreThreadFactorys
                .newSingleThreadNameableThreadFactory("RecycleSourceService"));
    }

    @Override
    protected void startUp() throws Exception {
        executor.scheduleWithFixedDelay(this::runOneIteration, 0, 1, TimeUnit.MINUTES);
    }

    @Override
    protected void shutDown() throws Exception {
        MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS);
    }

    protected void runOneIteration() {
        recycler.recycle(s -> parse(s).ifPresent(deliverer::deliver));
    }

    private Optional<Message> parse(String line) {
        String[] arr = line.split(",");
        if (arr.length < 2) {
            log.error("Message ignored, {}", line);
            return Optional.empty();
        }
        String type = arr[0].trim();
        Long id = Longs.tryParse(arr[1].trim());
        if (id == null) {
            log.error("Message ignored, {}", line);
            return Optional.empty();
        }
        return Optional.of(new Message(this.getClass(), type, id, null, line));
    }

}
